/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include "PulsarFriend.h"
#include "lib/SharedBuffer.h"

using namespace pulsar;

static std::string lookupUrl = "pulsar://localhost:6650";
static const std::string exampleSchema =
    R"({"type":"record","name":"Example","namespace":"test","fields":[{"name":"a","type":"int"},{"name":"b","type":"int"}]})";

TEST(SchemaTest, testSchema) {
    ClientConfiguration config;
    Client client(lookupUrl);
    Result res;

    Producer producer;
    ProducerConfiguration producerConf;
    producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
    res = client.createProducer("topic-avro", producerConf, producer);
    ASSERT_EQ(res, ResultOk);

    // Check schema version
    ASSERT_FALSE(producer.getSchemaVersion().empty());
    producer.close();

    ASSERT_EQ(ResultOk, res);

    // Creating producer with no schema on same topic should fail
    producerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
    res = client.createProducer("topic-avro", producerConf, producer);
    ASSERT_EQ(ResultIncompatibleSchema, res);

    // Creating producer with no schema on same topic should failed.
    // Because we set broker config isSchemaValidationEnforced=true
    res = client.createProducer("topic-avro", producer);
    ASSERT_EQ(ResultIncompatibleSchema, res);

    ConsumerConfiguration consumerConf;
    Consumer consumer;
    // Subscribing with no schema will still succeed
    res = client.subscribe("topic-avro", "sub-1", consumerConf, consumer);
    ASSERT_EQ(ResultOk, res);

    // Subscribing with same Avro schema will succeed
    consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
    ASSERT_EQ(ResultOk, res);

    // Subscribing with different schema type will fail
    consumerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
    ASSERT_EQ(ResultIncompatibleSchema, res);

    client.close();
}

TEST(SchemaTest, testHasSchemaVersion) {
    Client client(lookupUrl);
    std::string topic = "SchemaTest-HasSchemaVersion";
    SchemaInfo stringSchema(SchemaType::STRING, "String", "");

    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic + "1", "sub", ConsumerConfiguration().setSchema(stringSchema),
                                         consumer));
    Producer batchedProducer;
    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
                                              batchedProducer));
    Producer nonBatchedProducer;
    ASSERT_EQ(ResultOk, client.createProducer(topic + "1", ProducerConfiguration().setSchema(stringSchema),
                                              nonBatchedProducer));

    ASSERT_EQ(ResultOk, batchedProducer.send(MessageBuilder().setContent("msg-0").build()));
    ASSERT_EQ(ResultOk, nonBatchedProducer.send(MessageBuilder().setContent("msg-1").build()));

    Message msgs[2];
    ASSERT_EQ(ResultOk, consumer.receive(msgs[0], 3000));
    ASSERT_EQ(ResultOk, consumer.receive(msgs[1], 3000));

    std::string schemaVersion(8, '\0');
    ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
    ASSERT_TRUE(msgs[0].hasSchemaVersion());
    ASSERT_EQ(msgs[0].getSchemaVersion(), schemaVersion);

    ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
    ASSERT_TRUE(msgs[1].hasSchemaVersion());
    ASSERT_EQ(msgs[1].getSchemaVersion(), schemaVersion);

    client.close();
}

TEST(SchemaTest, testKeyValueSchema) {
    SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
    SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
    SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
    ASSERT_EQ(keyValueSchema.getSchema().size(),
              8 + keySchema.getSchema().size() + valueSchema.getSchema().size());
}

TEST(SchemaTest, testKeySchemaIsEmpty) {
    SchemaInfo keySchema(SchemaType::AVRO, "String", "");
    SchemaInfo valueSchema(SchemaType::AVRO, "String", exampleSchema);
    SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
    ASSERT_EQ(keyValueSchema.getSchema().size(),
              8 + keySchema.getSchema().size() + valueSchema.getSchema().size());

    SharedBuffer buffer = SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
                                             keyValueSchema.getSchema().size());
    int keySchemaSize = buffer.readUnsignedInt();
    ASSERT_EQ(keySchemaSize, -1);
    int valueSchemaSize = buffer.readUnsignedInt();
    ASSERT_EQ(valueSchemaSize, valueSchema.getSchema().size());
    std::string valueSchemaStr(buffer.slice(0, valueSchemaSize).data(), valueSchemaSize);
    ASSERT_EQ(valueSchema.getSchema(), valueSchemaStr);
}

TEST(SchemaTest, testValueSchemaIsEmpty) {
    SchemaInfo keySchema(SchemaType::AVRO, "String", exampleSchema);
    SchemaInfo valueSchema(SchemaType::AVRO, "String", "");
    SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
    ASSERT_EQ(keyValueSchema.getSchemaType(), KEY_VALUE);
    ASSERT_EQ(keyValueSchema.getSchema().size(),
              8 + keySchema.getSchema().size() + valueSchema.getSchema().size());

    SharedBuffer buffer = SharedBuffer::wrap(const_cast<char*>(keyValueSchema.getSchema().c_str()),
                                             keyValueSchema.getSchema().size());
    int keySchemaSize = buffer.readUnsignedInt();
    ASSERT_EQ(keySchemaSize, keySchema.getSchema().size());
    std::string keySchemaStr(buffer.slice(0, keySchemaSize).data(), keySchemaSize);
    ASSERT_EQ(keySchemaStr, keySchema.getSchema());
    buffer.consume(keySchemaSize);
    int valueSchemaSize = buffer.readUnsignedInt();
    ASSERT_EQ(valueSchemaSize, -1);
}

TEST(SchemaTest, testAutoDownloadSchema) {
    const std::string topic = "testAutoPublicSchema" + std::to_string(time(nullptr));
    std::string jsonSchema =
        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
    SchemaInfo schema(JSON, "test-schema", jsonSchema);

    Client client(lookupUrl);

    ConsumerConfiguration consumerConfiguration;
    consumerConfiguration.setSchema(schema);
    Consumer consumer;
    ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));

    ProducerConfiguration producerConfiguration;
    Producer producer;

    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

    Promise<Result, Producer> promise;
    clientImplPtr->createProducerAsync(topic, producerConfiguration, WaitForCallbackValue<Producer>(promise),
                                       true);
    ASSERT_EQ(ResultOk, promise.getFuture().get(producer));

    Message msg = MessageBuilder().setContent("content").build();
    ASSERT_EQ(ResultOk, producer.send(msg));

    ASSERT_EQ(ResultOk, consumer.receive(msg));
    ASSERT_EQ("content", msg.getDataAsString());
}
